#!/usr/bin/env perl

############################################################################           
#  Licensed to the Apache Software Foundation (ASF) under one or more                  
#  contributor license agreements.  See the NOTICE file distributed with               
#  this work for additional information regarding copyright ownership.                 
#  The ASF licenses this file to You under the Apache License, Version 2.0             
#  (the "License"); you may not use this file except in compliance with                
#  the License.  You may obtain a copy of the License at                               
#                                                                                      
#      http://www.apache.org/licenses/LICENSE-2.0                                      
#                                                                                      
#  Unless required by applicable law or agreed to in writing, software                 
#  distributed under the License is distributed on an "AS IS" BASIS,                   
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.            
#  See the License for the specific language governing permissions and                 
#  limitations under the License.                                                      
                                                                                       
###############################################################################
# Tests for pig streaming.
#
# This configuration file follows streaming functional spec: http://wiki.apache.org/pig/PigStreamingFunctionalSpec

$cfg = {
        'driver' => 'Pig',
        'nummachines' => 5,

        'groups' => [
		{
		# This group is for local mode testing
		'name' => 'StreamingLocal',
                'sortBenchmark' => 1,
                'sortResults' => 1,
                'floatpostprocess' => 1,
                'delimiter' => '	',
                'tests' => [
			{
		        #Section 1.1: perl script, no parameters
                        'num' => 1,
                        'execonly' => 'local',
                        'pig' => q#
A = load ':INPATH:/singlefile/studenttab10k';
B = foreach A generate $0, $1, $2;
C = stream B through `perl :SCRIPTHOMEPATH:/PigStreaming.pl`;
store C into ':OUTPATH:';#,
			'sql' => "select name, age, gpa from studenttab10k;",
			},
			{
		        #Section 1.3: define clause; perl script, with parameters
			'num' => 2,
			'execonly' => 'local',
                        'pig' => q#
define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl - -`;
A = load ':INPATH:/singlefile/studenttab10k';
B = foreach A generate $0, $1, $2;
C = stream B through CMD;
store C into ':OUTPATH:';#,
                        'sql' => "select name, age, gpa from studenttab10k;",
			},
			{
                        # Section 1.4: grouped data
                        'num' => 3,
                        'execonly' => 'local',
                        'pig' => q#
define CMD `perl :SCRIPTHOMEPATH:/GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl');
A = load ':INPATH:/singlefile/studenttab10k';
B = group A by $0;
C = foreach B generate flatten(A);
D = stream C through CMD;
store D into ':OUTPATH:';#,
                        'sql' => "select name, count(*) from studenttab10k group by name;",
                        },
			{
                        # Section 1.4: grouped and ordered data
                        'num' => 4,
                        'execonly' => 'local',
                        'pig' => q#
define CMD `perl :SCRIPTHOMEPATH:/GroupBy.pl '\t' 0 1`;
A = load ':INPATH:/singlefile/studenttab10k';
B = group A by $0;
C = foreach B {
        D = order A by $1;
        generate flatten(D);
};
E = stream C through CMD;
store E into ':OUTPATH:';#,
                        'sql' => "select name, age, count(*) from studenttab10k group by name, age;",
                        },
			{
			# Section 1.5: multiple streaming operators - adjacent - before local rearrange
                        'num' => 5,
                        'execonly' => 'local',
                        'pig' => q#
register :FUNCPATH:/testudf.jar;                        
define CMD `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump);
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through `perl :SCRIPTHOMEPATH:/PigStreaming.pl`;
C = stream B through CMD as (name, age, gpa);
D = foreach C generate name, age;
store D into ':OUTPATH:';#,
                        'sql' => "select name, age from studenttab10k;",
			},
			{
                        # Section 1.5: multiple streaming operators - not adjacent - before local rearrange
                        'num' => 6,
                        'execonly' => 'local',
                        'pig' => q#
register :FUNCPATH:/testudf.jar;                        
A = load ':INPATH:/singlefile/studenttab10k';
define CMD `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump);
B = stream A through CMD as (name, age, gpa);
C = filter B by age < '20';
D = foreach C generate name;
define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl - - :SCRIPTHOMEPATH:/nameMap`;
E = stream D through CMD;
store E into ':OUTPATH:';#,
                        'sql' => "select UPPER(name) from studenttab10k where age < '20';",
                        },		
			{
                        # Section 1.5: multiple streaming operators - adjacent - after local rearrange
                        'num' => 7,
                        'execonly' => 'local',
                        'pig' => q#
register :FUNCPATH:/testudf.jar;                        
define CMD1 `perl :SCRIPTHOMEPATH:/GroupBy.pl '\t' 0 1`;
define CMD2 `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump);
A = load ':INPATH:/singlefile/studenttab10k';
B = group A by $0;
C = foreach B {
        D = order A by $1;
        generate flatten(D);
};
E = stream C through CMD1;
F = stream E through CMD2;
store F into ':OUTPATH:';#,
                        'sql' => "select name, age, count(*) from studenttab10k group by name, age;",
                        },		
			{
                        # Section 1.5: multiple streaming operators - one before and one after local rearrange
                        # same alias name
                        'num' => 8,
                        'execonly' => 'local',
                        'pig' => q#
register :FUNCPATH:/testudf.jar;                        
define CMD1 `perl :SCRIPTHOMEPATH:/GroupBy.pl '\t' 0`;
define CMD2 `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump);
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through CMD2;
C = group B by $0;
D = foreach C generate flatten(B);
B = stream D through CMD1;
store B into ':OUTPATH:';#,
                        'sql' => "select name, count(*) from studenttab10k group by name;",
                        },
			{ 
                        # Section 3.1: use of custom deserializer
                        'num' => 9,
                        'execonly' => 'local',
                        'pig' => q#
define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl` output(stdout using PigStreaming());
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through CMD;
store B into ':OUTPATH:';#,
                        'sql' => "select name, age, gpa from studenttab10k;",
                        },
                        {
                        # Section 3.1: use of custom serializer and deserializer
                        'num' => 10,
                        'execonly' => 'local',
                        'pig' => q#
register :FUNCPATH:/testudf.jar;
define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl` input(stdin  using org.apache.pig.test.udf.streaming.StreamingDump) output(stdout using org.apache.pig.test.udf.streaming.DumpStreamer);
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through CMD as (name, age, gpa);
C = foreach B generate name, age;
store C into ':OUTPATH:';#,
                        'sql' => "select name, age from studenttab10k;",
                        },
                        {
                        # Section 3.3: streaming application reads from file rather than stdin
                        'num' => 11,
                        'execonly' => 'local',
                        'pig' => q#
define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl foo -` input('foo');
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through CMD;
store B into ':OUTPATH:';#,
                        'sql' => "select name, age, gpa from studenttab10k;",
                        },
                        {
                        # Section 3.4: streaming application writes single output to a file
                        'num' => 12,
                        'execonly' => 'local',
                        'pig' => q#
define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl - foo :SCRIPTHOMEPATH:/nameMap` output('foo' using PigStreaming);
A = load ':INPATH:/singlefile/studenttab10k';
B = foreach A generate $0;
C = stream B through CMD;
store C into ':OUTPATH:';#,
                        'sql' => "select upper(name) from studenttab10k;",
                        },
                        {
                        # Section 3.4: streaming application writes multiple outputs to file
                        'num' => 13,
                        'execonly' => 'local',
                        'pig' => q#
register :FUNCPATH:/testudf.jar;
define CMD `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl - sio_5_1 sio_5_2` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump) output('sio_5_1', 'sio_5_2');
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through CMD;
store B into ':OUTPATH:';#,
                        'sql' => "select name, age, gpa from studenttab10k;",
                        },
			{
                        # Section 3.4: streaming application writes multiple outputs: 1 to file and 1 to stdout
                        'num' => 14,
                        'execonly' => 'local',
                        'pig' => q#
register :FUNCPATH:/testudf.jar;
define CMD `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl - - sio_5_2` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump) output(stdout, 'sio_5_2');
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through CMD;
store B into ':OUTPATH:';#,
                        'sql' => "select name, age, gpa from studenttab10k;",
                        },
			{
                        # Section 4.3: integration with parameter substitition
                        'num' => 15,
                        'execonly' => 'local',
                        'pig_params' => ['-p', qq(script_name='PigStreaming.pl')],
                        'pig' => q#
define CMD `perl :SCRIPTHOMEPATH:/$script_name - - :SCRIPTHOMEPATH:/nameMap`;
A = load ':INPATH:/singlefile/studenttab10k';
B = foreach A generate $0;
C = stream B through CMD as (name);
D = group C by name;
E = foreach D generate group, COUNT(C);
store E into ':OUTPATH:';#,
                        'sql' => "select upper(name) as nm, count(*) from studenttab10k group by nm;",
                        },
			{
                        # Section 5.1: load/store optimization
                        'num' => 16,
                        'execonly' => 'local',
                        'pig' => q#
define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl`;
A = load ':INPATH:/singlefile/studenttab10k';
C = stream A through CMD;
store C into ':OUTPATH:';#,
                        'sql' => "select name, age, gpa from studenttab10k;",
                        },
			{
			# PIG-272: problem with optimization and intermediate store
			'num' => 17,
			'execonly' => 'local',
			'pig' => q#
define CMD1 `perl -ne 'print $_;print STDERR "stderr $_";'`; 
define CMD2 `:SCRIPTHOMEPATH:/Split.pl 3` input(stdin using PigStreaming(',')); 
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through CMD1;
C = stream B through CMD1;
D = stream C through CMD2;
store D into ':OUTPATH:';#,
                        'sql' => "select name, age, gpa from studenttab10k;",	
			},
			{
			# PIG-272: problem with optimization and intermediate store
			'num' => 18,
			'execonly' => 'local',
			'pig' => q#
define CMD1 `perl -ne 'print $_;'`; 
define CMD2 `:SCRIPTHOMEPATH:/Split.pl 3` input(stdin using PigStreaming(',')); 
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through CMD1;
store B into ':OUTPATH:.intermediate';
C = stream B through CMD1;
D = stream C through CMD2;
E = JOIN B by $0, D by $0;
store E into ':OUTPATH:';#,
                        'notmq' => 1,
                        'sql' => "select A.name, A.age, A.gpa, B.name, B.age, B.gpa from studenttab10k as A join studenttab10k as B using(name);",	
			},
			]
		},
	]
}
;
                
